e9f3506073f7784d2341d939d2ed372b7d56c88d,conduit-worker/src/main/java/com/inmobi/conduit/purge/DataPurgerService.java,DataPurgerService,getStreamsPathToPurge,#Map#boolean#,307
Before Change
String streamName = entry.getKey();
Path streamRootPath = entry.getValue();
String tableName = null;
if (isLocal) {
tableName = LOCAL_TABLE_PREFIX + "_" + streamName;
} else {
tableName = TABLE_PREFIX + "_" + streamName;
}
LOG.debug("Find Paths to purge for stream [" + streamName
+ "] streamRootPath [" + streamRootPath + "]");
// For each Stream, all years
FileStatus[] years = getAllFilesInDir(streamRootPath, fs);
if (years != null) {
for (FileStatus year : years) {
String yearVal = year.getPath().getName();
// For each month
FileStatus[] months = getAllFilesInDir(year.getPath(), fs);
if (months != null && months.length >= 1) {
for (FileStatus month : months) {
String monthVal = month.getPath().getName();
// For each day
FileStatus[] days = getAllFilesInDir(month.getPath(), fs);
if (days != null && days.length >= 1) {
for (FileStatus day : days) {
String dayVal = day.getPath().getName();
// For each day
FileStatus[] hours = getAllFilesInDir(day.getPath(), fs);
if (hours != null && hours.length >= 1) {
for (FileStatus hour : hours) {
LOG.debug("Working for hour [" + hour.getPath() + "]");
String hourVal = hour.getPath().getName();
Calendar streamDate = CalendarHelper.getDateHour(yearVal,
monthVal, dayVal, hourVal);
LOG.debug("Validate [" + streamDate.toString()
+ "] against retentionHours ["
+ getRetentionPeriod(streamName) + "]");
if (isPurge(streamDate, getRetentionPeriod(streamName))) {
LOG.debug("Adding stream to purge [" + hour.getPath()
+ "]");
Path hourPath = hour.getPath().makeQualified(fs);
addPartitionToList(streamName, tableName, hourPath, yearVal,
monthVal, dayVal, hourVal);
streamsToPurge.add(hourPath);
}
}
} else {
After Change
if (isPurge(streamDate, getRetentionPeriod(streamName))) {
LOG.debug("Adding stream to purge [" + hour.getPath()
+ "]");
streamsToPurge.add(hour.getPath().makeQualified(fs));
}
}
} else {